package k2paltform.common.util;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Iterator;

public class Configuration extends PropertiesConfiguration {

	public static final Logger LOG = LoggerFactory.getLogger(Configuration.class);

	private Watcher watcher; //for zookeeper watcher

	public Configuration() {
		super();
		setDelimiterParsingDisabled(true);
		watcher = new Watcher();
	}

	public Configuration(String resource) throws ConfigurationException {
		super();
		setDelimiterParsingDisabled(true);
		this.load(resource);
		watcher = new Watcher();
	}

	public Configuration(File resource) throws ConfigurationException {
		super();
		setDelimiterParsingDisabled(true);
		this.load(resource);
		watcher = new Watcher();
	}

	public Configuration(URL resource) throws ConfigurationException {
		super();
		setDelimiterParsingDisabled(true);
		this.load(resource);
		watcher = new Watcher();
	}

	public byte[] toBytes() throws ConfigurationException, IOException {
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		this.save(out);
		byte[] bytes = out.toByteArray();
		out.close();
		return bytes;
	}

	public void fromBytes(byte[] bytes) throws ConfigurationException, IOException {
		if(bytes == null) {
			LOG.warn("Bytes Array for init configuration object is null. Configuration Object init fail!");
			return;
		}
		ByteArrayInputStream in = new ByteArrayInputStream(bytes);
		this.load(in);
		in.close();
	}

	public void loadFromZookeeper(ZookeeperService zooService) throws Exception {
		this.fromBytes(zooService.get(Constants.ZOOKEEPER_CONF_PATH));
	}

	public void loadFromZookeeper(String zookeeperUrl, String namespace) throws Exception {
		ZookeeperService zookeeperService = new ZookeeperService(zookeeperUrl, namespace);
		zookeeperService.start();
		loadFromZookeeper(zookeeperService);
		zookeeperService.close();
	}

	public void loadFromZookeeper(String zookeeperUrl) throws Exception {
		loadFromZookeeper(zookeeperUrl, Constants.ZOOKEEPER_NAMESPACE);
	}

	/**
	 * If property does not exist, will create. If exists, will update
	 * @param conf
	 */
	public void addConfiguration(Configuration conf) {
		Iterator<String> keys = conf.getKeys();
		while(keys.hasNext()) {
			String key = keys.next();
			this.setProperty(key, conf.getProperty(key));
		}
	}

	public Watcher getWatcher() {
		return this.watcher;
	}

	class Watcher extends NodeWatcher {

		public Watcher() {
			super();
		}

		@Override
		public void clear() {
		}

		@Override
		public void load(byte[] data) {
			try {
				Configuration tempConf = new Configuration();
				tempConf.fromBytes(data);
				addConfiguration(tempConf);
				tempConf.clear();

				LOG.info("Configurations after update:{}.", Configuration.this.toString());
			} catch (ConfigurationException|IOException e) {
				LOG.error("Error to load configuration object from byte array");
				e.printStackTrace();
			}
		}
	}

	@Override
	public String toString() {
		Iterator<String> it = this.getKeys();
		StringBuffer sb = new StringBuffer();
		sb.append("{");
		while (it.hasNext()) {
			String key = it.next();
			sb.append(key).append("=").append(this.getProperty(key)).append(", ");
		}
		// delete the last comma and space.
		if (sb.length() > 3) {
			sb.deleteCharAt(sb.length() - 2);
			sb.deleteCharAt(sb.length() - 1);
		}
		sb.append("}");

		return sb.toString();
	}

	public static void main(String[] args) throws Exception {
		if(args.length < 2) {
			System.out.println("Usage: Configuration ZookeeperConfFilePath DataPlatformConfFilePath");
			System.exit(1);
		}
		Configuration zookeeperConf = new Configuration(args[0]);
		ZookeeperService zs = new ZookeeperService(zookeeperConf);
		zs.start();
		Configuration dataplatformConf = new Configuration(args[1]);
		zs.create(Constants.ZOOKEEPER_CONF_PATH, dataplatformConf.toBytes());
		System.out.println("DataPlatform Configurations in " + args[1] + " have been stored in Zookeeper");

		if (!zs.checkExists(Constants.ZOOKEEPER_ASSET_PATH)) {
			zs.create(Constants.ZOOKEEPER_ASSET_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
			System.out.println("Created asset zk node");
		}
		if (!zs.checkExists(Constants.ZOOKEEPER_SCHEMA_PATH)) {
			zs.create(Constants.ZOOKEEPER_SCHEMA_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
			System.out.println("Created schema zk node");
		}
		if (!zs.checkExists(Constants.ZOOKEEPER_SCHEMA_RELOAD_PATH)) {
			zs.create(Constants.ZOOKEEPER_SCHEMA_RELOAD_PATH, String.valueOf(System.currentTimeMillis()).getBytes());
			System.out.println("Created schema zk reload node");
		}

		zs.close();
	}
}